package com.desheng.bigdata.flink.stream.sink

import java.io.IOException
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

object _03Sink2JDBCOps {
    def main(args: Array[String]): Unit = {

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val lines = env.fromCollection(List(
            "hello you",
            "hello me",
            "hello she"
        ))


        val pairs: DataStream[(String, Int)] = lines.flatMap(_.split("\\s+"))
            .map((_, 1))
            .keyBy(0)
            .sum(1)

        pairs.addSink(new MyJDBCSink())

        env.execute()
    }
}

class MyJDBCSink extends RichSinkFunction[(String, Int)] {
    var connection: Connection = null
    var ps: PreparedStatement = null
    override def open(parameters: Configuration): Unit = {
        Class.forName("com.mysql.jdbc.Driver")
        connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "sorry")
//        val sql =
//            """
//              |insert into words(word, count)
//              | values(?, ?)
//            """.stripMargin
        val sql =
            """
              |INSERT INTO words (word, `count`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `count` = ?
            """.stripMargin
        ps = connection.prepareStatement(sql)
    }

    override def invoke(kv: (String, Int), context: SinkFunction.Context[_]): Unit = {

        ps.setString(1, kv._1)
        ps.setInt(2, kv._2)
        ps.setInt(3, kv._2)
        ps.execute()
    }

    override def close(): Unit = {
        try {
            if(ps != null) {
                ps.close()
            }
        } catch {
            case e: IOException => e.printStackTrace()
            case _ => {}
        } finally {
            try {
                if(connection != null) {
                    connection.close()
                }
            } catch {
                case e: IOException => e.printStackTrace()
                case _ => {}
            }
        }
    }
}
